feat: fill default values

This commit is contained in:
evenyag
2023-08-02 21:56:25 +08:00
parent 4d55ed1e55
commit 97b0574598
4 changed files with 194 additions and 50 deletions

View File

@@ -90,6 +90,10 @@ impl MitoEngine {
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;
// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.
self.inner
.handle_request_body(RequestBody::Write(write_request))
.await

View File

@@ -181,6 +181,30 @@ pub enum Error {
reason: String,
location: Location,
},
/// An error type to indicate that schema is changed and we need
/// to fill default values again.
#[snafu(display(
"Need to fill default value to column {} of region {}",
column,
region_id
))]
FillDefault {
region_id: RegionId,
column: String,
// The error is for retry purpose so we don't need a location.
},
#[snafu(display(
"Failed to create default value for column {} of region {}",
column,
region_id
))]
CreateDefault {
region_id: RegionId,
column: String,
source: datatypes::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -200,11 +224,13 @@ impl ErrorExt for Error {
| RegionExists { .. }
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. } => StatusCode::Unexpected,
| RegionCorrupted { .. }
| CreateDefault { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidSchema { .. }
| InvalidRequest { .. } => StatusCode::InvalidArguments,
| InvalidRequest { .. }
| FillDefault { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}

View File

@@ -119,44 +119,61 @@ pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
Some(proto_value)
}
/// Returns true if the column type is equal to exepcted type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
match (column_type, expect_type) {
(ColumnDataType::Boolean, ConcreteDataType::Boolean(_))
| (ColumnDataType::Int8, ConcreteDataType::Int8(_))
| (ColumnDataType::Int16, ConcreteDataType::Int16(_))
| (ColumnDataType::Int32, ConcreteDataType::Int32(_))
| (ColumnDataType::Int64, ConcreteDataType::Int64(_))
| (ColumnDataType::Uint8, ConcreteDataType::UInt8(_))
| (ColumnDataType::Uint16, ConcreteDataType::UInt16(_))
| (ColumnDataType::Uint32, ConcreteDataType::UInt32(_))
| (ColumnDataType::Uint64, ConcreteDataType::UInt64(_))
| (ColumnDataType::Float32, ConcreteDataType::Float32(_))
| (ColumnDataType::Float64, ConcreteDataType::Float64(_))
| (ColumnDataType::Binary, ConcreteDataType::Binary(_))
| (ColumnDataType::String, ConcreteDataType::String(_))
| (ColumnDataType::Date, ConcreteDataType::Date(_))
| (ColumnDataType::Datetime, ConcreteDataType::DateTime(_))
| (
ColumnDataType::TimestampSecond,
ConcreteDataType::Timestamp(TimestampType::Second(_)),
)
| (
ColumnDataType::TimestampMillisecond,
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)),
)
| (
ColumnDataType::TimestampMicrosecond,
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)),
)
| (
ColumnDataType::TimestampNanosecond,
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)),
)
| (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_)))
| (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_)))
| (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_)))
| (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true,
_ => false,
/// Convert [ConcreteDataType] to [ColumnDataType].
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
let column_data_type = match data_type {
ConcreteDataType::Boolean(_) => ColumnDataType::Boolean,
ConcreteDataType::Int8(_) => ColumnDataType::Int8,
ConcreteDataType::Int16(_) => ColumnDataType::Int16,
ConcreteDataType::Int32(_) => ColumnDataType::Int32,
ConcreteDataType::Int64(_) => ColumnDataType::Int64,
ConcreteDataType::UInt8(_) => ColumnDataType::Uint8,
ConcreteDataType::UInt16(_) => ColumnDataType::Uint16,
ConcreteDataType::UInt32(_) => ColumnDataType::Uint32,
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond,
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
ColumnDataType::TimestampMillisecond
}
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
ColumnDataType::TimestampMicrosecond
}
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
ColumnDataType::TimestampNanosecond
}
ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond,
ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond,
ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond,
ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond,
ConcreteDataType::Null(_)
| ConcreteDataType::Interval(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => return None,
};
Some(column_data_type)
}
/// Convert semantic type to proto's semantic type
pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType {
match semantic_type {
SemanticType::Tag => v1::SemanticType::Tag,
SemanticType::Field => v1::SemanticType::Field,
SemanticType::Timestamp => v1::SemanticType::Timestamp,
}
}
/// Returns true if the column type is equal to exepcted type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
if let Some(expect) = to_column_data_type(expect_type) {
column_type == expect
} else {
false
}
}

View File

@@ -18,15 +18,18 @@ use std::collections::HashMap;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use greptime_proto::v1::{ColumnDataType, Rows};
use snafu::ensure;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::{InvalidRequestSnafu, Result};
use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result};
use crate::metadata::{ColumnMetadata, RegionMetadata};
use crate::proto_util::{check_column_type, check_semantic_type};
use crate::proto_util::{
check_column_type, check_semantic_type, to_column_data_type, to_proto_semantic_type,
to_proto_value,
};
/// Options that affect the entire region.
///
@@ -95,18 +98,40 @@ pub struct WriteRequest {
pub op_type: OpType,
/// Rows to write.
pub rows: Rows,
/// Map column name to column index in `rows`.
name_to_index: HashMap<String, usize>,
}
impl WriteRequest {
/// Returns a new request.
pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest {
let name_to_index = rows
.schema
.iter()
.enumerate()
.map(|(index, column)| (column.column_name.clone(), index))
.collect();
WriteRequest {
region_id,
op_type,
rows,
name_to_index,
}
}
/// Validate the request.
pub(crate) fn validate(&self) -> Result<()> {
// - checks whether the request is too large.
// - checks whether each row in rows has the same schema.
// - checks whether each column match the schema in Rows.
// - checks rows don't have duplicate columns.
unimplemented!()
}
/// Checks schema of rows.
///
/// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
/// error.
pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
let region_id = self.region_id;
// Index all columns in rows.
@@ -126,7 +151,7 @@ impl WriteRequest {
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} expect type {:?}, given: {:?}({})",
"column {} expect type {:?}, given: {:?}({})",
column.column_schema.name,
column.column_schema.data_type,
ColumnDataType::from_i32(input_col.datatype),
@@ -141,7 +166,7 @@ impl WriteRequest {
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} has semantic type {:?}, given: {:?}({})",
"column {} has semantic type {:?}, given: {:?}({})",
column.column_schema.name,
column.semantic_type,
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type),
@@ -150,15 +175,21 @@ impl WriteRequest {
}
);
} else {
// For columns not in rows, checks whether they are nullable.
// For columns not in rows, checks whether they have default value.
ensure!(
column.column_schema.is_nullable()
|| column.column_schema.default_constraint().is_some(),
InvalidRequestSnafu {
region_id,
reason: format!("Missing column {}", column.column_schema.name),
reason: format!("missing column {}", column.column_schema.name),
}
);
return FillDefaultSnafu {
region_id,
column: &column.column_schema.name,
}
.fail();
}
}
@@ -167,13 +198,79 @@ impl WriteRequest {
let names: Vec<_> = rows_columns.into_keys().collect();
return InvalidRequestSnafu {
region_id,
reason: format!("Unknown columns: {:?}", names),
reason: format!("unknown columns: {:?}", names),
}
.fail();
}
Ok(())
}
/// Try to fill missing columns.
///
/// Currently, our protobuf format might be inefficient when we need to fill lots of null
/// values.
pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
for column in &metadata.column_metadatas {
if !self.name_to_index.contains_key(&column.column_schema.name) {
self.fill_column(metadata.region_id, &column)?;
}
}
Ok(())
}
/// Fill default value for specific `column`.
fn fill_column(&mut self, region_id: RegionId, column: &ColumnMetadata) -> Result<()> {
// Need to add a default value for this column.
let default_value = column
.column_schema
.create_default()
.context(CreateDefaultSnafu {
region_id,
column: &column.column_schema.name,
})?
// This column doesn't have default value.
.with_context(|| InvalidRequestSnafu {
region_id,
reason: format!(
"column {} does not have default value",
column.column_schema.name
),
})?;
// Convert default value into proto's value.
let proto_value = to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
region_id,
reason: format!(
"no protobuf type for default value of column {} ({:?})",
column.column_schema.name, column.column_schema.data_type
),
})?;
// Insert default value to each row.
for row in &mut self.rows.rows {
row.values.push(proto_value.clone());
}
// Insert column schema.
let datatype = to_column_data_type(&column.column_schema.data_type).with_context(|| {
InvalidRequestSnafu {
region_id,
reason: format!(
"no protobuf type for column {} ({:?})",
column.column_schema.name, column.column_schema.data_type
),
}
})?;
self.rows.schema.push(ColumnSchema {
column_name: column.column_schema.name.clone(),
datatype: datatype as i32,
semantic_type: to_proto_semantic_type(column.semantic_type) as i32,
});
Ok(())
}
}
/// Sender and write request.