diff --git a/Cargo.lock b/Cargo.lock index 72fcde7bd5..07d429a777 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4126,7 +4126,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5e422e518b6755c46ec8e6f26233c433ff016558#5e422e518b6755c46ec8e6f26233c433ff016558" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7855e1fcc806a795e5e714297fa65083b1110dce#7855e1fcc806a795e5e714297fa65083b1110dce" dependencies = [ "prost", "serde", @@ -5510,7 +5510,7 @@ dependencies = [ "datafusion-common", "datatypes", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5e422e518b6755c46ec8e6f26233c433ff016558)", + "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7855e1fcc806a795e5e714297fa65083b1110dce)", "lazy_static", "log-store", "metrics", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index a6da092ff2..817c6ebf64 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -32,8 +32,8 @@ datafusion.workspace = true datafusion-common.workspace = true datatypes = { path = "../datatypes" } futures.workspace = true -# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/72 is merged. -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5e422e518b6755c46ec8e6f26233c433ff016558" } +# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged. +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7855e1fcc806a795e5e714297fa65083b1110dce" } lazy_static = "1.4" log-store = { path = "../log-store" } metrics.workspace = true diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index fc9ef43946..6f68646752 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -26,7 +26,9 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; -use crate::request::{CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest}; +use crate::request::{ + CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest, +}; use crate::worker::WorkerGroup; /// Region engine implementation for timeseries data. @@ -88,7 +90,8 @@ impl MitoEngine { pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { write_request.validate()?; - self.inner.handle_request_body(RequestBody::Write(write_request)) + self.inner + .handle_request_body(RequestBody::Write(write_request)) .await } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 5087969456..d65db0d5f1 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -174,6 +174,13 @@ pub enum Error { reason: String, location: Location, }, + + #[snafu(display("Invalid request to region {}, reason: {}", region_id, reason))] + InvalidRequest { + region_id: RegionId, + reason: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -194,9 +201,10 @@ impl ErrorExt for Error { | NewRecordBatch { .. } | RegionNotFound { .. } | RegionCorrupted { .. } => StatusCode::Unexpected, - InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => { - StatusCode::InvalidArguments - } + InvalidScanIndex { .. } + | InvalidMeta { .. } + | InvalidSchema { .. } + | InvalidRequest { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { StatusCode::Internal } diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index f201a88002..a78cbf528b 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -221,6 +221,50 @@ impl RegionMetadata { } } +/// Fields skipped in serialization. +struct SkippedFields { + /// Last schema. + schema: SchemaRef, + /// Id of the time index column. + time_index: ColumnId, + /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas). + id_to_index: HashMap, +} + +impl SkippedFields { + /// Constructs skipped fields from `column_metadatas`. + fn new(column_metadatas: &[ColumnMetadata]) -> Result { + let column_schemas = column_metadatas + .iter() + .map(|column_metadata| column_metadata.column_schema.clone()) + .collect(); + let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?); + let time_index = column_metadatas + .iter() + .find_map(|col| { + if col.semantic_type == SemanticType::Timestamp { + Some(col.column_id) + } else { + None + } + }) + .context(InvalidMetaSnafu { + reason: "time index not found", + })?; + let id_to_index = column_metadatas + .iter() + .enumerate() + .map(|(idx, col)| (col.column_id, idx)) + .collect(); + + Ok(SkippedFields { + schema, + time_index, + id_to_index, + }) + } +} + /// Builder to build [RegionMetadata]. pub struct RegionMetadataBuilder { region_id: RegionId, @@ -310,14 +354,14 @@ impl ColumnMetadata { } /// The semantic type of one column -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum SemanticType { /// Tag column, also is a part of primary key. - Tag, + Tag = 0, /// A column that isn't a time index or part of primary key. - Field, + Field = 1, /// Time index column. - Timestamp, + Timestamp = 2, } /// Fields skipped in serialization. diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index c0266ff0df..2be5ddf247 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -15,7 +15,7 @@ //! Mito region. pub(crate) mod opener; -mod version; +pub(crate) mod version; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; -use crate::region::version::VersionControlRef; +use crate::region::version::{VersionControlRef, VersionRef}; /// Type to store region version. pub type VersionNumber = u32; @@ -56,6 +56,11 @@ impl MitoRegion { Ok(()) } + + /// Returns current version of the region. + pub(crate) fn version(&self) -> VersionRef { + self.version_control.current() + } } /// Regions indexed by ids. diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index d9c5e9ecec..54fe29df3c 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use arc_swap::ArcSwap; -use store_api::manifest::ManifestVersion; use store_api::storage::SequenceNumber; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; @@ -48,6 +47,11 @@ impl VersionControl { version: ArcSwap::new(Arc::new(version)), } } + + /// Returns current [Version]. + pub(crate) fn current(&self) -> VersionRef { + self.version.load_full() + } } pub(crate) type VersionControlRef = Arc; @@ -59,21 +63,20 @@ pub(crate) struct Version { /// /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing /// metadata and reuse metadata when creating a new `Version`. - metadata: RegionMetadataRef, + pub(crate) metadata: RegionMetadataRef, /// Mutable and immutable memtables. /// /// Wrapped in Arc to make clone of `Version` much cheaper. - memtables: MemtableVersionRef, + pub(crate) memtables: MemtableVersionRef, /// SSTs of the region. - ssts: SstVersionRef, + pub(crate) ssts: SstVersionRef, /// Inclusive max sequence of flushed data. - flushed_sequence: SequenceNumber, - // TODO(yingwen): Remove this. - /// Current version of region manifest. - manifest_version: ManifestVersion, + pub(crate) flushed_sequence: SequenceNumber, // TODO(yingwen): RegionOptions. } +pub(crate) type VersionRef = Arc; + /// Version builder. pub(crate) struct VersionBuilder { metadata: RegionMetadataRef, @@ -94,7 +97,6 @@ impl VersionBuilder { memtables: Arc::new(MemtableVersion::new(self.mutable)), ssts: Arc::new(SstVersion::new()), flushed_sequence: 0, - manifest_version: 0, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index a81d8c9d54..d1dfd29701 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -83,33 +83,34 @@ pub struct CloseRequest { pub region_id: RegionId, } -/// Mutation to apply to a set of rows. -#[derive(Debug)] -pub struct Mutation { - /// Type of the mutation. - pub op_type: OpType, - /// Rows to write. - pub rows: Rows, -} - /// Request to write a region. #[derive(Debug)] pub struct WriteRequest { /// Region to write. pub region_id: RegionId, - /// Mutation to the region. - pub mutation: Mutation, + /// Type of the write request. + pub op_type: OpType, + /// Rows to write. + pub rows: Rows, } impl WriteRequest { /// Validate the request. pub(crate) fn validate(&self) -> Result<()> { - // 1. checks whether the request is too large. - // 2. checks whether each row in rows has the same schema. + // - checks whether the request is too large. + // - checks whether each row in rows has the same schema. + // - checks rows don't have duplicate columns. unimplemented!() } } +/// Sender and write request. +pub(crate) struct SenderWriteRequest { + /// Result sender. + pub(crate) sender: Option>>, + pub(crate) request: WriteRequest, +} + /// Request sent to a worker pub(crate) enum WorkerRequest { /// Region request. @@ -175,4 +176,15 @@ impl RequestBody { pub(crate) fn is_write(&self) -> bool { matches!(self, RequestBody::Write(_)) } + + /// Converts the request into a [WriteRequest]. + /// + /// # Panics + /// Panics if it isn't a [WriteRequest]. + pub(crate) fn into_write_request(self) -> WriteRequest { + match self { + RequestBody::Write(req) => req, + other => panic!("expect write request, found {other:?}"), + } + } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 23fba45f29..dd26f9ac39 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -38,7 +38,7 @@ use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::region::{RegionMap, RegionMapRef}; -use crate::request::{RegionRequest, RequestBody, WorkerRequest}; +use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest}; /// Identifier for a worker. pub(crate) type WorkerId = u32; @@ -329,7 +329,10 @@ impl RegionWorkerLoop { match worker_req { WorkerRequest::Region(req) => { if req.body.is_write() { - write_requests.push(req); + write_requests.push(SenderWriteRequest { + sender: req.sender, + request: req.body.into_write_request(), + }); } else { ddl_requests.push(req); } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 2f26bd63e6..e8bdb4f9fc 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -14,18 +14,59 @@ //! Handling write requests. -use crate::{worker::RegionWorkerLoop, request::{RegionRequest}}; +use std::collections::HashMap; + +use datatypes::prelude::ConcreteDataType; +use datatypes::types::{TimeType, TimestampType}; +use greptime_proto::v1::mito::Mutation; +use greptime_proto::v1::{ColumnDataType, Rows}; +use snafu::ensure; +use tokio::sync::oneshot::Sender; + +use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result}; +use crate::metadata::SemanticType; +use crate::region::version::VersionRef; +use crate::region::MitoRegionRef; +use crate::request::SenderWriteRequest; +use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// Takes and handles all write requests. - /// - /// # Panics - /// Panics if `write_requests` contains a request whose body is a [WriteRequest]. - pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec) { + pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec) { if write_requests.is_empty() { return; } + let mut region_ctxs = HashMap::new(); + for sender_req in write_requests { + let region_id = sender_req.request.region_id; + // Checks whether the region exists. + if !region_ctxs.contains_key(®ion_id) { + let Some(region) = self.regions.get_region(region_id) else { + // No such region. + send_result(sender_req.sender, RegionNotFoundSnafu { + region_id, + }.fail()); + + continue; + }; + + // Initialize the context. + region_ctxs.insert(region_id, RegionWriteCtx::new(region)); + } + + // Safety: Now we ensure the region exists. + let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); + + // Checks request schema. + if let Err(e) = region_ctx.check_schema(&sender_req.request.rows) { + send_result(sender_req.sender, Err(e)); + + continue; + } + + // Push request. + } // We need to check: // - region exists, if not, return error // - check whether the schema is compatible with region schema @@ -41,7 +82,166 @@ impl RegionWorkerLoop { } } +/// Send result to the request. +fn send_result(sender: Option>>, res: Result<()>) { + if let Some(sender) = sender { + // Ignore send result. + let _ = sender.send(res); + } +} +/// Context to write to a region. +struct RegionWriteCtx { + /// Region to write. + region: MitoRegionRef, + /// Version of the region while creating the context. + version: VersionRef, + /// Valid mutations. + mutations: Vec, + /// Result senders. + /// + /// The sender is 1:1 map to the mutation in `mutations`. + senders: Vec>>>, +} + +impl RegionWriteCtx { + /// Returns an empty context. + fn new(region: MitoRegionRef) -> RegionWriteCtx { + let version = region.version(); + RegionWriteCtx { + region, + version, + mutations: Vec::new(), + senders: Vec::new(), + } + } + + /// Checks schema of rows. + fn check_schema(&self, rows: &Rows) -> Result<()> { + let region_id = self.region.region_id; + // Index all columns in rows. + let mut rows_columns: HashMap<_, _> = rows + .schema + .iter() + .map(|column| (&column.column_name, column)) + .collect(); + + // Checks all columns in this region. + for column in &self.version.metadata.column_metadatas { + if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { + // Check data type. + ensure!( + check_column_type(input_col.datatype, &column.column_schema.data_type), + InvalidRequestSnafu { + region_id, + reason: format!( + "Column {} expect type {:?}, given: {:?}({})", + column.column_schema.name, + column.column_schema.data_type, + ColumnDataType::from_i32(input_col.datatype), + input_col.datatype, + ) + } + ); + + // Check semantic type. + ensure!( + check_semantic_type(input_col.semantic_type, column.semantic_type), + InvalidRequestSnafu { + region_id, + reason: format!( + "Column {} has semantic type {:?}, given: {:?}({})", + column.column_schema.name, + column.semantic_type, + greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), + input_col.semantic_type + ), + } + ); + } else { + // For columns not in rows, checks whether they are nullable. + ensure!( + column.column_schema.is_nullable() + || column.column_schema.default_constraint().is_some(), + InvalidRequestSnafu { + region_id, + reason: format!("Missing column {}", column.column_schema.name), + } + ); + } + } + + // Checks all columns in rows exist in the regino. + if !rows_columns.is_empty() { + let names: Vec<_> = rows_columns.into_keys().collect(); + return InvalidRequestSnafu { + region_id, + reason: format!("Unknown columns: {:?}", names), + } + .fail(); + } + + Ok(()) + } +} + +/// Returns true if the pb semantic type is valid. +fn check_semantic_type(type_value: i32, semantic_type: SemanticType) -> bool { + type_value == semantic_type as i32 +} + +/// Returns true if the pb type value is valid. +fn check_column_type(type_value: i32, expect_type: &ConcreteDataType) -> bool { + let Some(column_type) = ColumnDataType::from_i32(type_value) else { + return false; + }; + + is_column_type_eq(column_type, expect_type) +} + +/// Returns true if the column type is equal to exepcted type. +fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { + match (column_type, expect_type) { + (ColumnDataType::Boolean, ConcreteDataType::Boolean(_)) + | (ColumnDataType::Int8, ConcreteDataType::Int8(_)) + | (ColumnDataType::Int16, ConcreteDataType::Int16(_)) + | (ColumnDataType::Int32, ConcreteDataType::Int32(_)) + | (ColumnDataType::Int64, ConcreteDataType::Int64(_)) + | (ColumnDataType::Uint8, ConcreteDataType::UInt8(_)) + | (ColumnDataType::Uint16, ConcreteDataType::UInt16(_)) + | (ColumnDataType::Uint32, ConcreteDataType::UInt32(_)) + | (ColumnDataType::Uint64, ConcreteDataType::UInt64(_)) + | (ColumnDataType::Float32, ConcreteDataType::Float32(_)) + | (ColumnDataType::Float64, ConcreteDataType::Float64(_)) + | (ColumnDataType::Binary, ConcreteDataType::Binary(_)) + | (ColumnDataType::String, ConcreteDataType::String(_)) + | (ColumnDataType::Date, ConcreteDataType::Date(_)) + | (ColumnDataType::Datetime, ConcreteDataType::DateTime(_)) + | ( + ColumnDataType::TimestampSecond, + ConcreteDataType::Timestamp(TimestampType::Second(_)), + ) + | ( + ColumnDataType::TimestampMillisecond, + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)), + ) + | ( + ColumnDataType::TimestampMicrosecond, + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)), + ) + | ( + ColumnDataType::TimestampNanosecond, + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)), + ) + | (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_))) + | (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_))) + | (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_))) + | (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true, + _ => false, + } +} + +// sender // pb write message // region id // rows @@ -85,4 +285,3 @@ impl RegionWorkerLoop { // self.requests.is_empty() // } // } -