diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 6f68646752..e4a74527cb 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -90,6 +90,10 @@ impl MitoEngine { pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { write_request.validate()?; + // TODO(yingwen): Fill default values. + // We need to fill default values before writing it to WAL so we can get + // the same default value after reopening the region. + self.inner .handle_request_body(RequestBody::Write(write_request)) .await diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d65db0d5f1..4b92522118 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -181,6 +181,30 @@ pub enum Error { reason: String, location: Location, }, + + /// An error type to indicate that schema is changed and we need + /// to fill default values again. + #[snafu(display( + "Need to fill default value to column {} of region {}", + column, + region_id + ))] + FillDefault { + region_id: RegionId, + column: String, + // The error is for retry purpose so we don't need a location. + }, + + #[snafu(display( + "Failed to create default value for column {} of region {}", + column, + region_id + ))] + CreateDefault { + region_id: RegionId, + column: String, + source: datatypes::Error, + }, } pub type Result = std::result::Result; @@ -200,11 +224,13 @@ impl ErrorExt for Error { | RegionExists { .. } | NewRecordBatch { .. } | RegionNotFound { .. } - | RegionCorrupted { .. } => StatusCode::Unexpected, + | RegionCorrupted { .. } + | CreateDefault { .. } => StatusCode::Unexpected, InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } - | InvalidRequest { .. } => StatusCode::InvalidArguments, + | InvalidRequest { .. } + | FillDefault { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { StatusCode::Internal } diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs index 838bd36ba4..0b44835e43 100644 --- a/src/mito2/src/proto_util.rs +++ b/src/mito2/src/proto_util.rs @@ -119,44 +119,61 @@ pub(crate) fn to_proto_value(value: Value) -> Option { Some(proto_value) } -/// Returns true if the column type is equal to exepcted type. -fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { - match (column_type, expect_type) { - (ColumnDataType::Boolean, ConcreteDataType::Boolean(_)) - | (ColumnDataType::Int8, ConcreteDataType::Int8(_)) - | (ColumnDataType::Int16, ConcreteDataType::Int16(_)) - | (ColumnDataType::Int32, ConcreteDataType::Int32(_)) - | (ColumnDataType::Int64, ConcreteDataType::Int64(_)) - | (ColumnDataType::Uint8, ConcreteDataType::UInt8(_)) - | (ColumnDataType::Uint16, ConcreteDataType::UInt16(_)) - | (ColumnDataType::Uint32, ConcreteDataType::UInt32(_)) - | (ColumnDataType::Uint64, ConcreteDataType::UInt64(_)) - | (ColumnDataType::Float32, ConcreteDataType::Float32(_)) - | (ColumnDataType::Float64, ConcreteDataType::Float64(_)) - | (ColumnDataType::Binary, ConcreteDataType::Binary(_)) - | (ColumnDataType::String, ConcreteDataType::String(_)) - | (ColumnDataType::Date, ConcreteDataType::Date(_)) - | (ColumnDataType::Datetime, ConcreteDataType::DateTime(_)) - | ( - ColumnDataType::TimestampSecond, - ConcreteDataType::Timestamp(TimestampType::Second(_)), - ) - | ( - ColumnDataType::TimestampMillisecond, - ConcreteDataType::Timestamp(TimestampType::Millisecond(_)), - ) - | ( - ColumnDataType::TimestampMicrosecond, - ConcreteDataType::Timestamp(TimestampType::Microsecond(_)), - ) - | ( - ColumnDataType::TimestampNanosecond, - ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)), - ) - | (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_))) - | (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_))) - | (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_))) - | (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true, - _ => false, +/// Convert [ConcreteDataType] to [ColumnDataType]. +pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { + let column_data_type = match data_type { + ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, + ConcreteDataType::Int8(_) => ColumnDataType::Int8, + ConcreteDataType::Int16(_) => ColumnDataType::Int16, + ConcreteDataType::Int32(_) => ColumnDataType::Int32, + ConcreteDataType::Int64(_) => ColumnDataType::Int64, + ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, + ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, + ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, + ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, + ConcreteDataType::Float32(_) => ColumnDataType::Float32, + ConcreteDataType::Float64(_) => ColumnDataType::Float64, + ConcreteDataType::Binary(_) => ColumnDataType::Binary, + ConcreteDataType::String(_) => ColumnDataType::String, + ConcreteDataType::Date(_) => ColumnDataType::Date, + ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, + ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { + ColumnDataType::TimestampMillisecond + } + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { + ColumnDataType::TimestampMicrosecond + } + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { + ColumnDataType::TimestampNanosecond + } + ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, + ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, + ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, + ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, + ConcreteDataType::Null(_) + | ConcreteDataType::Interval(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => return None, + }; + + Some(column_data_type) +} + +/// Convert semantic type to proto's semantic type +pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType { + match semantic_type { + SemanticType::Tag => v1::SemanticType::Tag, + SemanticType::Field => v1::SemanticType::Field, + SemanticType::Timestamp => v1::SemanticType::Timestamp, + } +} + +/// Returns true if the column type is equal to exepcted type. +fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { + if let Some(expect) = to_column_data_type(expect_type) { + column_type == expect + } else { + false } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index d80c06e711..e59176becc 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -18,15 +18,18 @@ use std::collections::HashMap; use std::time::Duration; use common_base::readable_size::ReadableSize; -use greptime_proto::v1::{ColumnDataType, Rows}; -use snafu::ensure; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; -use crate::error::{InvalidRequestSnafu, Result}; +use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result}; use crate::metadata::{ColumnMetadata, RegionMetadata}; -use crate::proto_util::{check_column_type, check_semantic_type}; +use crate::proto_util::{ + check_column_type, check_semantic_type, to_column_data_type, to_proto_semantic_type, + to_proto_value, +}; /// Options that affect the entire region. /// @@ -95,18 +98,40 @@ pub struct WriteRequest { pub op_type: OpType, /// Rows to write. pub rows: Rows, + /// Map column name to column index in `rows`. + name_to_index: HashMap, } impl WriteRequest { + /// Returns a new request. + pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest { + let name_to_index = rows + .schema + .iter() + .enumerate() + .map(|(index, column)| (column.column_name.clone(), index)) + .collect(); + WriteRequest { + region_id, + op_type, + rows, + name_to_index, + } + } + /// Validate the request. pub(crate) fn validate(&self) -> Result<()> { // - checks whether the request is too large. // - checks whether each row in rows has the same schema. + // - checks whether each column match the schema in Rows. // - checks rows don't have duplicate columns. unimplemented!() } /// Checks schema of rows. + /// + /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault) + /// error. pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> { let region_id = self.region_id; // Index all columns in rows. @@ -126,7 +151,7 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "Column {} expect type {:?}, given: {:?}({})", + "column {} expect type {:?}, given: {:?}({})", column.column_schema.name, column.column_schema.data_type, ColumnDataType::from_i32(input_col.datatype), @@ -141,7 +166,7 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "Column {} has semantic type {:?}, given: {:?}({})", + "column {} has semantic type {:?}, given: {:?}({})", column.column_schema.name, column.semantic_type, greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), @@ -150,15 +175,21 @@ impl WriteRequest { } ); } else { - // For columns not in rows, checks whether they are nullable. + // For columns not in rows, checks whether they have default value. ensure!( column.column_schema.is_nullable() || column.column_schema.default_constraint().is_some(), InvalidRequestSnafu { region_id, - reason: format!("Missing column {}", column.column_schema.name), + reason: format!("missing column {}", column.column_schema.name), } ); + + return FillDefaultSnafu { + region_id, + column: &column.column_schema.name, + } + .fail(); } } @@ -167,13 +198,79 @@ impl WriteRequest { let names: Vec<_> = rows_columns.into_keys().collect(); return InvalidRequestSnafu { region_id, - reason: format!("Unknown columns: {:?}", names), + reason: format!("unknown columns: {:?}", names), } .fail(); } Ok(()) } + + /// Try to fill missing columns. + /// + /// Currently, our protobuf format might be inefficient when we need to fill lots of null + /// values. + pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> { + for column in &metadata.column_metadatas { + if !self.name_to_index.contains_key(&column.column_schema.name) { + self.fill_column(metadata.region_id, &column)?; + } + } + + Ok(()) + } + + /// Fill default value for specific `column`. + fn fill_column(&mut self, region_id: RegionId, column: &ColumnMetadata) -> Result<()> { + // Need to add a default value for this column. + let default_value = column + .column_schema + .create_default() + .context(CreateDefaultSnafu { + region_id, + column: &column.column_schema.name, + })? + // This column doesn't have default value. + .with_context(|| InvalidRequestSnafu { + region_id, + reason: format!( + "column {} does not have default value", + column.column_schema.name + ), + })?; + + // Convert default value into proto's value. + let proto_value = to_proto_value(default_value).with_context(|| InvalidRequestSnafu { + region_id, + reason: format!( + "no protobuf type for default value of column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + })?; + + // Insert default value to each row. + for row in &mut self.rows.rows { + row.values.push(proto_value.clone()); + } + + // Insert column schema. + let datatype = to_column_data_type(&column.column_schema.data_type).with_context(|| { + InvalidRequestSnafu { + region_id, + reason: format!( + "no protobuf type for column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + } + })?; + self.rows.schema.push(ColumnSchema { + column_name: column.column_schema.name.clone(), + datatype: datatype as i32, + semantic_type: to_proto_semantic_type(column.semantic_type) as i32, + }); + + Ok(()) + } } /// Sender and write request.